{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Count Min Sketch\n",
    "\n",
    "Twitter users use free-form hashtags to indicate a tweet is attached to an idea or a cause. E.g. #tacotuesday #firstworldproblems #metoo etc.\n",
    "\n",
    "The typical stream of tweets is 6-10k tweets/second throughout the day, with spikes 10x larger. And a tweet can have 0-10s of hashtags.\n",
    "\n",
    "Imagine we want to estimate the rank of hashtags for multiple timescales."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### #WednesdayWisdom Example Stream\n",
    "\n",
    "<img src=\"files/count_min_0.png\">"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Strategy 1: Keep exact count\n",
    "\n",
    "Build a data structure:\n",
    "\n",
    "* represents all unique values ($i_t$) of the hashtags\n",
    "* counts the occurances ($c_t$)\n",
    "* keeps track of time period of an occurance ($c_t^\\Delta$)\n",
    "\n",
    "is the size of the cardinality of the stream (the number of unique hashtags in time period $\\Delta$) x (number of time periods we need to keep to make our rankings).\n",
    "\n",
    "Computation time is determined by:\n",
    "\n",
    "* key lookup ($O(1)$)\n",
    "* counter increment by 1\n",
    "\n",
    "So this solution is fast and grows linearly with cardinality."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Strategy 2: Keep approximate count\n",
    "\n",
    "Come up with a data structure that does not grow linearly with cardiality and gives a bounded-error estimate of the ranking."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Basic Idea of Count Min sketch\n",
    "\n",
    "The strategy:\n",
    "\n",
    "* Map the input value ($i_t$) to _multiple_ points in a _relatively small_ output space. An increment each of those locations when we see that item. \n",
    "* In this strategy, the count associated with a given input will be applied to multiple counts in the output space. \n",
    "* Collisions will occur. That is, different values may sometimes increment the same cell.\n",
    "* But, the _minimum_ count associated with a given input will have some desirable properties, including the ability to be used to estimate the largest N counts.\n",
    "\n",
    "<img src=\"files/count_min_2.png\">\n",
    "\n",
    "http://debasishg.blogspot.com/2014/01/count-min-sketch-data-structure-for.html"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "So assume we have d pairwise-independent hash functions {h1 .. hd} that hash each of our inputs to the range (1 .. w)\n",
    "\n",
    "Parameters of the sketch:\n",
    "\n",
    "*   $\\epsilon$\n",
    "*   $\\delta$\n",
    "\n",
    "These parameters are inversely and exponentially (respectively) related to the sketch size parameters, d and w. \n",
    "\n",
    "The reason to parameterize the data structure by these factors - $\\epsilon$ and $\\delta$, is the error in answering the rank query is within a factor of $\\epsilon$ with probability $\\delta$. "
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Strategy 1 Implementat Counting"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# define a function to return a list of the exact top users, sorted by count\n",
    "def exact_top_users(f, top_n = 10):\n",
    "    import operator\n",
    "    counts = {}\n",
    "    for user in f:\n",
    "        user = user.rstrip('\\n')\n",
    "        try:\n",
    "            if user not in counts:\n",
    "                counts[user] = 1\n",
    "            else:\n",
    "                counts[user] += 1\n",
    "        except ValueError:\n",
    "            pass\n",
    "        except KeyError:\n",
    "            pass\n",
    "    print(\"exact data structure counters = {}\".format(len(counts)))\n",
    "    counter = 0\n",
    "    results = []\n",
    "    for user,count in reversed(sorted(counts.items(), key=operator.itemgetter(1))):\n",
    "        if counter >= top_n:\n",
    "            break\n",
    "        results.append('{},{}'.format(user,str(count)))\n",
    "        counter += 1\n",
    "    return results\n",
    "# note that the output format is '[user] [count]'"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Strategy 2: Implementation of the CM sketch"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import sys\n",
    "import random\n",
    "import numpy as np\n",
    "import heapq\n",
    "import json\n",
    "import time\n",
    "\n",
    "BIG_PRIME = 9223372036854775783\n",
    "\n",
    "def random_parameter():\n",
    "    return random.randrange(0, BIG_PRIME - 1)\n",
    "\n",
    "\n",
    "class Sketch:\n",
    "    def __init__(self, delta, epsilon, k):\n",
    "        \"\"\"\n",
    "        Setup a new count-min sketch with parameters delta, epsilon and k\n",
    "\n",
    "        The parameters delta and epsilon control the accuracy of the\n",
    "        estimates of the sketch\n",
    "\n",
    "        Cormode and Muthukrishnan prove that for an item i with count a_i, the\n",
    "        estimate from the sketch a_i_hat will satisfy the relation\n",
    "\n",
    "        a_hat_i <= a_i + epsilon * ||a||_1\n",
    "\n",
    "        with probability at least 1 - delta, where a is the the vector of all\n",
    "        all counts and ||x||_1 is the L1 norm of a vector x\n",
    "\n",
    "        Parameters\n",
    "        ----------\n",
    "        delta : float\n",
    "            A value in the unit interval that sets the precision of the sketch\n",
    "        epsilon : float\n",
    "            A value in the unit interval that sets the precision of the sketch\n",
    "        k : int\n",
    "            A positive integer that sets the number of top items counted\n",
    "\n",
    "        Examples\n",
    "        --------\n",
    "        >>> s = Sketch(10**-7, 0.005, 40)\n",
    "\n",
    "        Raises\n",
    "        ------\n",
    "        ValueError\n",
    "            If delta or epsilon are not in the unit interval, or if k is\n",
    "            not a positive integer\n",
    "\n",
    "        \"\"\"\n",
    "        if delta <= 0 or delta >= 1:\n",
    "            raise ValueError(\"delta must be between 0 and 1, exclusive\")\n",
    "        if epsilon <= 0 or epsilon >= 1:\n",
    "            raise ValueError(\"epsilon must be between 0 and 1, exclusive\")\n",
    "        if k < 1:\n",
    "            raise ValueError(\"k must be a positive integer\")\n",
    "\n",
    "        self.w = int(np.ceil(np.exp(1) / epsilon))\n",
    "        self.d = int(np.ceil(np.log(1 / delta)))\n",
    "        print(\"cm data Structure: {} hashes x {} cells = {} counters\".format(self.d, self.w, self.d*self.w))\n",
    "        self.k = k\n",
    "        self.hash_functions = [self.__generate_hash_function() for i in range(self.d)]\n",
    "        self.count = np.zeros((self.d, self.w), dtype='int32')\n",
    "        self.heap, self.top_k = [], {} # top_k => [estimate, key] pairs\n",
    "\n",
    "    def update(self, key, increment):\n",
    "        \"\"\"\n",
    "        Updates the sketch for the item with name of key by the amount\n",
    "        specified in increment\n",
    "\n",
    "        Parameters\n",
    "        ----------\n",
    "        key : string\n",
    "            The item to update the value of in the sketch\n",
    "        increment : integer\n",
    "            The amount to update the sketch by for the given key\n",
    "\n",
    "        Examples\n",
    "        --------\n",
    "        >>> s = Sketch(10**-7, 0.005, 40)\n",
    "        >>> s.update('http://www.cnn.com/', 1)\n",
    "\n",
    "        \"\"\"\n",
    "        for row, hash_function in enumerate(self.hash_functions):\n",
    "            column = hash_function(abs(hash(key)))\n",
    "            self.count[row, column] += increment\n",
    "\n",
    "        self.update_heap(key)\n",
    "\n",
    "    def update_heap(self, key):\n",
    "        \"\"\"\n",
    "        Updates the class's heap that keeps track of the top k items for a\n",
    "        given key\n",
    "\n",
    "        For the given key, it checks whether the key is present in the heap,\n",
    "        updating accordingly if so, and adding it to the heap if it is\n",
    "        absent\n",
    "\n",
    "        Parameters\n",
    "        ----------\n",
    "        key : string\n",
    "            The item to check against the heap\n",
    "\n",
    "        \"\"\"\n",
    "        estimate = self.get(key)\n",
    "\n",
    "        if not self.heap or estimate >= self.heap[0][0]:\n",
    "            if key in self.top_k:\n",
    "                old_pair = self.top_k.get(key)\n",
    "                old_pair[0] = estimate\n",
    "                heapq.heapify(self.heap)\n",
    "            else:\n",
    "                if len(self.top_k) < self.k:\n",
    "                    heapq.heappush(self.heap, [estimate, key])\n",
    "                    self.top_k[key] = [estimate, key]\n",
    "                else:\n",
    "                    new_pair = [estimate, key]\n",
    "                    old_pair = heapq.heappushpop(self.heap, new_pair)\n",
    "                    if new_pair[1] != old_pair[1]:\n",
    "                        del self.top_k[old_pair[1]]\n",
    "                        self.top_k[key] = new_pair\n",
    "                    self.top_k[key] = new_pair\n",
    "\n",
    "    def get(self, key):\n",
    "        \"\"\"\n",
    "        Fetches the sketch estimate for the given key\n",
    "\n",
    "        Parameters\n",
    "        ----------\n",
    "        key : string\n",
    "            The item to produce an estimate for\n",
    "\n",
    "        Returns\n",
    "        -------\n",
    "        estimate : int\n",
    "            The best estimate of the count for the given key based on the\n",
    "            sketch\n",
    "\n",
    "        Examples\n",
    "        --------\n",
    "        >>> s = Sketch(10**-7, 0.005, 40)\n",
    "        >>> s.update('http://www.cnn.com/', 1)\n",
    "        >>> s.get('http://www.cnn.com/')\n",
    "        1\n",
    "\n",
    "        \"\"\"\n",
    "        value = sys.maxsize\n",
    "        for row, hash_function in enumerate(self.hash_functions):\n",
    "            column = hash_function(abs(hash(key)))\n",
    "            value = min(self.count[row, column], value)\n",
    "\n",
    "        return value\n",
    "\n",
    "    def __generate_hash_function(self):\n",
    "        \"\"\"\n",
    "        Returns a hash function from a family of pairwise-independent hash\n",
    "        functions\n",
    "\n",
    "        \"\"\"\n",
    "        a, b = random_parameter(), random_parameter()\n",
    "        return lambda x: (a * x + b) % BIG_PRIME % self.w"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "! head CM_small.txt\n",
    "! cat CM_small.txt | sort | uniq -c | sort -n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "f = open('CM_small.txt')\n",
    "results_exact = sorted(exact_top_users(f))\n",
    "print(\"\\n\".join(results_exact))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# define a function to return a list of the estimated top users, sorted by count\n",
    "def CM_top_users(f, s, top_n = 10):\n",
    "    for user_name in f:\n",
    "        s.update(user_name.rstrip('\\n'),1)\n",
    "    \n",
    "    results = []\n",
    "    counter = 0\n",
    "    for value in reversed(sorted(s.top_k.values())):\n",
    "        if counter >= top_n:\n",
    "            break\n",
    "        results.append('{1},{0}'.format(str(value[0]),str(value[1])))\n",
    "        counter += 1\n",
    "    return results\n",
    "# note that the output format is '[user] [count]'"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# instantiate a Sketch object\n",
    "s = Sketch(10**-3, 0.1, 10)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "f = open('CM_small.txt')\n",
    "results_CM = sorted(CM_top_users(f,s))\n",
    "print(\"\\n\".join(results_CM))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "for item in zip(results_exact,results_CM):\n",
    "    print(item)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Is it possible to make the sketch so coarse that its estimates are wrong even for this data set?\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "s = Sketch(0.9, 0.9, 10)\n",
    "f = open('CM_small.txt')\n",
    "results_coarse_CM = CM_top_users(f,s)\n",
    "print(\"\\n\".join(results_coarse_CM))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Yes! (if you try enough) Why? \n",
    "\n",
    "* The 'w' parameter goes like $\\text{ceiling}\\exp(1/\\epsilon)$, which is always >=~ 3.\n",
    "* The 'd' parameter goes like $\\text{ceiling}\\log(1/\\delta)$, which is always >= 1.\n",
    "\n",
    "So, you're dealing with a space with minimum size 3 x 1. With 10 records, it's possible that all 4 users map their counts to the point. So it's possible to see an estimate as high as 10, in this case."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Now for a larger data set..."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "! wc -l CM_large.txt\n",
    "! cat CM_large.txt | sort | uniq | wc -l\n",
    "! cat CM_large.txt | sort | uniq -c | sort -rn"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "f = open('CM_large.txt')\n",
    "%time results_exact = exact_top_users(f)\n",
    "print(\"\\n\".join(results_exact))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# this could take a few minutes\n",
    "f = open('CM_large.txt')\n",
    "s = Sketch(10**-4, 10**-4, 10)\n",
    "%time results_CM = CM_top_users(f,s)\n",
    "print(\"\\n\".join(results_CM))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "For this precision and dataset size, the CM algo takes _much_ longer than the exact solution. In fact, the crossover point at which the CM sketch can achieve reasonable accuracy in the same time as the exact solution is a very large number of entries. "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "for item in zip(results_exact,results_CM):\n",
    "    print(item)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# the CM sketch gets the top entry (an outlier) correct but doesn't do well \n",
    "# estimating the order of the more degenerate counts\n",
    "\n",
    "# let's decrease the precision via both the epsilon and delta parameters, \n",
    "# and see whether it still gets the \"heavy-hitter\" correct\n",
    "\n",
    "f = open('CM_large.txt')\n",
    "s = Sketch(10**-3, 10**-2, 50)\n",
    "%time results_CM = CM_top_users(f,s)\n",
    "print(\"\\n\".join(results_CM))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# nope...sketch is too coarse, too many collisions, and the prominence of user 'user_0 129' is obscured\n",
    "for item in zip(results_exact,results_CM):\n",
    "    print(item)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "The most common use of the CM sketch is analysis of streaming data. Why?\n",
    "\n",
    "* Becasue the data are arriving in real time, the hashing of the inputs is not a bottleneck as it is when the data are already collected.\n",
    "* Similarly, the recalculation of the top-k list (implemented as a heap, in this case) is done on insert. No need to sort the entire list.\n",
    "* The sketches are associative, meaning that the operation can be easily parallelized, and the results combined in the end.\n",
    "* Dot-product estimations of combinations of items\n",
    "\n",
    "**Take away**: use the CM sketch to estimate of the top-k most frequent elements in a streaming environment. "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.7.2"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 1
}
